package com.xiaofan.apitest.tabletest

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Csv, Elasticsearch, FileSystem, Json, Schema}

object EsOutputTest {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    val filePath = "D:\\big-data\\code\\FlinkTutorial\\src\\main\\resources\\sensor.txt"

    tableEnv.connect(new FileSystem().path(filePath))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temp", DataTypes.DOUBLE())
      )
      .createTemporaryTable("inputTable")

    val sensorTable: Table = tableEnv.from("inputTable")

    // 简单转换
    val resultTable: Table = sensorTable
      .select($"id", $"temp")
      .filter($"id" === "sensor_1")

    // 聚合转换
    val aggTable: Table = sensorTable
      .groupBy($"id")
      .select($"id", $"id".count as "count")

    // 输出到es
    tableEnv.connect(new Elasticsearch()
      .version("7")
      .host("192.168.157.11", 9200, "http")
      .index("sensor_es_1")
      .documentType("sensor")
    )
      .inUpsertMode()
      .withFormat(new Json())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("cnt", DataTypes.BIGINT())
      )
      .createTemporaryTable("esOutputTable")

    aggTable.executeInsert("esOutputTable")

  }

}
